12 ROS2 实践课-动作通信节点开发与调试

ROS2 动作通信节点开发与调试

关联:索引

  1. 场景提问:机械臂执行一条分拣路径要 15 秒,你如何知道它执行到哪一步?出现风险时你如何让它停下?

1. action 文件的基本结构

# Goal(目标)
<字段类型> <字段名>
...
---
# Result(结果)
<字段类型> <字段名>
...
---
# Feedback(反馈)
<字段类型> <字段名>
...

动作名建议:/sorting/arm/execute_path

# Goal
string task_id
string arm_id
string path_id
float32 speed
float32 timeout_sec
---
# Result
bool ok
int32 error_code
string message
string final_state
---
# Feedback
float32 progress
string current_step
string safety_state

1. 创建工作空间与接口包(推荐接口独立成包)

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_interfaces --build-type ament_cmake

# 放自定义 action 的目录
mkdir -p sorting_interfaces/action

说明:接口包选用 ament_cmake 是 ROS2 里生成 .srv/.msg/.action 的常见做法,不代表必须写 C++;动作服务器/客户端节点依然可以用 ament_python 开发。

建议目录:

# Goal
string task_id
string arm_id
string path_id
float32 speed
float32 timeout_sec
---
# Result
bool ok
int32 error_code
string message
string final_state
---
# Feedback
float32 progress
string current_step
string safety_state

package.xml(接口包最小可用模板,可直接替换包名/描述/维护者信息):

<?xml version="1.0"?>
<package format="3">
  <name>sorting_interfaces</name>
  <version>0.0.0</version>
  <description>Interface definitions for sorting robot</description>
  <maintainer email="[email protected]">teacher</maintainer>
  <license>Apache-2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>

  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

CMakeLists.txt(接口包最小可用模板,把 action 文件名填进去):

cmake_minimum_required(VERSION 3.8)
project(sorting_interfaces)

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(action_msgs REQUIRED)

set(action_files
  "action/ArmExecutePath.action"
)

rosidl_generate_interfaces(${PROJECT_NAME}
  ${action_files}
  DEPENDENCIES action_msgs
)

ament_export_dependencies(rosidl_default_runtime)
ament_package()

扩展提示:如果 action 里使用了 builtin_interfaces/Time 等外部类型,需要在 package.xml<depend>builtin_interfaces</depend>,并在 CMakeLists.txtfind_package(builtin_interfaces REQUIRED),同时在 rosidl_generate_interfaces(...) 里增加 DEPENDENCIES builtin_interfaces

3. 编译与环境刷新

cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash

4. 生成结果验证(必须能用命令看到接口)

查看接口是否已注册:

ros2 interface list | grep sorting_interfaces

查看 action 具体字段:

ros2 interface show sorting_interfaces/action/ArmExecutePath

5. 小组产出(当堂提交)

  1. 快问快答:动作通信相比服务通信,多了哪两类关键信息?

1. 创建动作功能包(Python 示例)

source /opt/ros/humble/setup.bash
cd ~/ros2_ws/src
ros2 pkg create sorting_action_practice --build-type ament_python --dependencies rclpy sorting_interfaces

建议文件路径:

入口点配置(setup.py)示例结构:

entry_points={
    'console_scripts': [
        'execute_path_action_server = sorting_action_practice.execute_path_action_server:main',
        'execute_path_action_client = sorting_action_practice.execute_path_action_client:main',
    ],
},

实现代码(可直接复制):

动作服务器:execute_path_action_server.py

import time

import rclpy
from rclpy.action import ActionServer
from rclpy.action import CancelResponse
from rclpy.action import GoalResponse
from rclpy.node import Node

from sorting_interfaces.action import ArmExecutePath

class ExecutePathActionServer(Node):
    def __init__(self) -> None:
        super().__init__('execute_path_action_server')
        self._action_server = ActionServer(
            self,
            ArmExecutePath,
            '/sorting/arm/execute_path',
            execute_callback=self.execute_callback,
            goal_callback=self.goal_callback,
            cancel_callback=self.cancel_callback,
        )

    def goal_callback(self, goal_request: ArmExecutePath.Goal) -> GoalResponse:
        if not goal_request.task_id or not goal_request.arm_id:
            return GoalResponse.REJECT
        if goal_request.timeout_sec <= 0.0:
            return GoalResponse.REJECT
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle) -> CancelResponse:
        return CancelResponse.ACCEPT

    def execute_callback(self, goal_handle):
        feedback = ArmExecutePath.Feedback()

        total_steps = 20
        start_time = time.time()

        for i in range(total_steps):
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result = ArmExecutePath.Result()
                result.ok = False
                result.error_code = 2002
                result.message = 'canceled by request'
                result.final_state = 'CANCELED'
                return result

            elapsed = time.time() - start_time
            if elapsed > float(goal_handle.request.timeout_sec):
                goal_handle.abort()
                result = ArmExecutePath.Result()
                result.ok = False
                result.error_code = 2003
                result.message = 'timeout on server'
                result.final_state = 'TIMEOUT'
                return result

            feedback.progress = float(i + 1) / float(total_steps)
            feedback.current_step = f'step_{i + 1}'
            feedback.safety_state = 'OK'
            goal_handle.publish_feedback(feedback)
            time.sleep(0.2)

        goal_handle.succeed()
        result = ArmExecutePath.Result()
        result.ok = True
        result.error_code = 0
        result.message = 'completed'
        result.final_state = 'SUCCEEDED'
        return result

def main(args=None) -> None:
    rclpy.init(args=args)
    node = ExecutePathActionServer()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

动作客户端:execute_path_action_client.py

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from sorting_interfaces.action import ArmExecutePath

class ExecutePathActionClient(Node):
    def __init__(self) -> None:
        super().__init__('execute_path_action_client')
        self._client = ActionClient(self, ArmExecutePath, '/sorting/arm/execute_path')

    def feedback_callback(self, feedback_msg) -> None:
        feedback: ArmExecutePath.Feedback = feedback_msg.feedback
        self.get_logger().info(
            f'feedback: progress={feedback.progress:.2f}, step={feedback.current_step}, safety={feedback.safety_state}'
        )

    def run(self) -> None:
        if not self._client.wait_for_server(timeout_sec=3.0):
            self.get_logger().error('action server not available: /sorting/arm/execute_path')
            return

        goal = ArmExecutePath.Goal()
        goal.task_id = 't001'
        goal.arm_id = 'arm_1'
        goal.path_id = 'p01'
        goal.speed = 0.5
        goal.timeout_sec = 10.0

        send_goal_future = self._client.send_goal_async(goal, feedback_callback=self.feedback_callback)
        rclpy.spin_until_future_complete(self, send_goal_future, timeout_sec=3.0)
        if not send_goal_future.done() or send_goal_future.exception() is not None:
            self.get_logger().error('failed to send goal')
            return

        goal_handle = send_goal_future.result()
        if not goal_handle.accepted:
            self.get_logger().error('goal rejected')
            return

        self.get_logger().info('goal accepted')
        get_result_future = goal_handle.get_result_async()
        rclpy.spin_until_future_complete(self, get_result_future, timeout_sec=15.0)
        if not get_result_future.done() or get_result_future.exception() is not None:
            self.get_logger().error('failed to get result')
            return

        result = get_result_future.result().result
        self.get_logger().info(
            f'result: ok={result.ok}, error_code={result.error_code}, final_state={result.final_state}, message={result.message}'
        )

def main(args=None) -> None:
    rclpy.init(args=args)
    node = ExecutePathActionClient()
    try:
        node.run()
    finally:
        node.destroy_node()
        rclpy.shutdown()

目标:在“路径执行”动作跑通后,做一个更贴近生产的“抓取任务”动作,体会阶段反馈与取消的价值。

1. 编译与运行(先服务端后客户端)

cd ~/ros2_ws
colcon build --symlink-install

# 每个新终端都必须 source,否则看不到接口与可执行入口
source /opt/ros/humble/setup.bash
source install/setup.bash

终端 A(启动动作服务器):

ros2 run sorting_action_practice execute_path_action_server

终端 B(查看动作列表与类型):

ros2 action list
ros2 action info /sorting/arm/execute_path
ros2 interface show sorting_interfaces/action/ArmExecutePath

成功调用示例(期望:Goal accepted,持续看到 feedback,最终 result=SUCCEEDED):

ros2 action send_goal /sorting/arm/execute_path sorting_interfaces/action/ArmExecutePath "{task_id: 't001', arm_id: 'arm_1', path_id: 'p01', speed: 0.5, timeout_sec: 10.0}" --feedback

失败调用示例(Goal 被拒绝,期望:Goal rejected;理由:timeout_sec<=0):

ros2 action send_goal /sorting/arm/execute_path sorting_interfaces/action/ArmExecutePath "{task_id: 't002', arm_id: 'arm_1', path_id: 'p01', speed: 0.5, timeout_sec: 0.0}" --feedback

失败调用示例(服务端超时中止,期望:result=TIMEOUT):

ros2 action send_goal /sorting/arm/execute_path sorting_interfaces/action/ArmExecutePath "{task_id: 't003', arm_id: 'arm_1', path_id: 'p01', speed: 0.5, timeout_sec: 0.1}" --feedback

先发送一个会执行更久的目标(例如把 timeout_sec 设得更大),然后在另一个终端取消:

ros2 action list
ros2 action cancel /sorting/arm/execute_path

1. 接口定义/生成相关(编译时报错)

  1. 确认 action/*.action 文件格式正确(必须有两条 --- 分隔)
  2. 确认 colcon build 无报错
  3. 确认每个终端执行过 source install/setup.bash

2. Action Server 不可用(运行时最常见)

  1. ros2 node list 确认服务端节点是否存在
  2. ros2 action list 确认动作名是否出现(应包含 /sorting/arm/execute_path
  3. ros2 action info /sorting/arm/execute_path 确认类型是否为 sorting_interfaces/action/ArmExecutePath
  4. 确认两端 ROS_DOMAIN_ID 一致,且都 source 了同一个工作空间 install

2. 反馈无数据

  1. 确认服务端确实在执行循环里发布 feedback(不是只算完一次性返回)
  2. 确认 feedback 发布频率与执行节奏匹配(不要 sleep 太久或根本没 publish)
  3. 确认客户端/CLI 使用了 --feedback,且动作名/类型一致

3. 任务取消失败

  1. 服务端是否实现 cancel 回调与取消分支(检查是否判断 cancel 请求并退出执行循环)
  2. 取消后是否返回一致的 result(final_state/error_code/message)
  3. 执行逻辑是否存在不可中断的阻塞段(长时间计算/外部调用未设置超时)

提示词模板(可直接复制):

你是 ROS2 Humble 的动作通信开发与调试专家。请基于以下需求输出可运行的实现与自测方法。
需求:机械臂分拣任务动作(长时任务)
- 接口规范:请先用 5 条要点总结 action 接口设计规范(goal/feedback/result/取消语义/验收证据)
- 自定义 action:请给出 .action 内容(字段要可追溯、可验收)
- Python rclpy:请给出动作服务器与客户端完整代码(关键逻辑处添加简短注释:发布反馈、处理取消、结果字段含义)
- 约束:必须周期发布 feedback;必须支持 cancel;必须区分 success/failed/canceled 三类 result
- 自测:给出 ros2 interface show、ros2 action list/info/send_goal/cancel 的命令与期望关键输出(至少包含 1 次成功与 1 次取消)
- 排错:如果出现“反馈无数据 / 任务取消失败 / colcon 编译失败”,请按“现象→原因→定位命令→修复步骤→复验命令”输出
注意:不要假设任何文件已经存在;请给出推荐的包划分与文件路径(interfaces 包与节点包);涉及依赖必须在 package.xml / CMakeLists.txt 或 setup.py 中体现;不要编造不存在的依赖/包名(例如把 action API 当作独立的 Python 依赖包)。